-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(source): prefer SpecificParserConfig
over SourceStruct
#12602
Conversation
Codecov Report
@@ Coverage Diff @@
## main #12602 +/- ##
==========================================
+ Coverage 68.76% 68.77% +0.01%
==========================================
Files 1495 1495
Lines 250159 250092 -67
==========================================
- Hits 172031 172011 -20
+ Misses 78128 78081 -47
Flags with carried forward coverage won't be shown. Click here to find out more.
... and 3 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
SourceStruct::new(SourceFormat::Plain, SourceEncode::Avro), | ||
info, | ||
with_properties, | ||
)?; | ||
let parser_config = SpecificParserConfig::new(info, with_properties)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the caller is actually passing in UpsertAvro today. Is the caller supposed to call extract_upsert_avro_table_schema
?
It does not affect correctness right now because only encoding_properties
field is used, and SpecificParserConfig::new
handles UpsertAvro and PlainAvro the same for encoding.
risingwave/src/frontend/src/handler/create_source.rs
Lines 532 to 559 in 6fb7ae5
let stream_source_info = StreamSourceInfo { | |
key_message_name, | |
format: FormatType::Upsert as i32, | |
row_encode: EncodeType::Avro as i32, | |
row_schema_location: avro_schema.row_schema_location.0.clone(), | |
use_schema_registry: avro_schema.use_schema_registry, | |
proto_message_name: message_name.unwrap_or(AstString("".into())).0, | |
upsert_avro_primary_key, | |
name_strategy, | |
..Default::default() | |
}; | |
let columns = | |
extract_avro_table_schema(&stream_source_info, with_properties).await?; | |
(Some(columns), sql_defined_pk_names, stream_source_info) | |
} else { | |
let stream_source_info = StreamSourceInfo { | |
format: FormatType::Upsert as i32, | |
row_encode: EncodeType::Avro as i32, | |
row_schema_location: avro_schema.row_schema_location.0.clone(), | |
use_schema_registry: avro_schema.use_schema_registry, | |
proto_message_name: message_name.unwrap_or(AstString("".into())).0, | |
name_strategy, | |
key_message_name, | |
..Default::default() | |
}; | |
let (columns, pk_from_avro) = | |
extract_upsert_avro_table_schema(&stream_source_info, with_properties).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dates back to first day upsert was supported: #8111
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The motivation LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall lgtm
Let's hold this PR for a bit until we figure out whether the inconsistency above is intentional. |
let source_struct = extract_source_struct(&self.source_info)?; | ||
let psrser_config = | ||
SpecificParserConfig::new(source_struct, &self.source_info, &self.properties)?; | ||
let psrser_config = SpecificParserConfig::new(&self.source_info, &self.properties)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let psrser_config = SpecificParserConfig::new(&self.source_info, &self.properties)?; | |
let parser_config = SpecificParserConfig::new(&self.source_info, &self.properties)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All updates seem bugless. LGTM.
According to a recent offline discussion, the special path of upsert-avro-with-sql_defined_pk pretending to be plain-avro is going to be removed. This refactor will be rebased after that is done. |
PR merged after digging and confirming an answer for the following question:
TLDR:
Back to the question, there were 3 options:
The call was made by upsert avro with user defined primary key in sql.
As a result, in cases where [1] Schema registry does allow key schema to be absent. In most parts of doc it uses |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Preparation for the following goal:
enum ProtocolProperties
andenum SourceFormat
into a singleenum ConnectorFormat
(or derived similar to encode below when needed).enum EncodingProperties
andenum SourceEncode
intoenum ConnectorEncode
with derivedEnumDiscriminants
.In this first part, we observe that most callers are always calling these two in sequence:
extract_source_struct: PbStreamSourceInfo -> SourceStruct
SpecificParserConfig::new: (SourceStruct, PbStreamSourceInfo, HashMap<String, String>) -> Self
So we change it into a single call, not exposing the intermediate
SourceStruct
:SpecificParserConfig::new: (PbStreamSourceInfo, HashMap<String, String>) -> Self
Other places using
SourceStruct
:FsSourceDesc
does not usesource_struct
. It already containsSpecificParserConfig
insideFsConnectorSource
.DatagenEventGenerator
only supports json today. When it supports more encodings later, it would need the properties and having only enum discriminants is not enough.handle_alter_source_column
rejects avro or protobuf based on discriminant alone. It also rejects json with schema registry. It is unclear whether we need discriminant alone or the full properties. This usage has not been changed by this PR.In the second part, we will do trivial renames to consolidate the enums as mentioned above.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.